package com.yinxing.netty.utils;

import com.yinxing.framework.system.SystemInfo;
import com.yinxing.framework.utils.JsonUtils;
import com.yinxing.framework.utils.SpringUtils;
import com.yinxing.netty.server.ChannelMap;
import com.yinxing.webapi.code.service.sys.ISysMessageService;
import com.yinxing.webapi.code.service.sys.ISysUserService;
import com.yinxing.webapi.shiro.LoginUser;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
public class ServerUtils {

    private static final ChannelMap channelMap = new ChannelMap();

    private static final AttributeKey<LoginUser> userVoKey = AttributeKey.valueOf("userVoKey");

    /**
     * 把用户和channel绑定,并且把channel放入容器
     * @param sysUserId 用户ID
     * @param channel   连接通道对象
     */
    public static void channelLogin(final Long sysUserId, final Channel channel) {
        Executor executor = SpringUtils.getBean("nettyExecutor", Executor.class);
        final ISysMessageService sysMessageService = SpringUtils.getBean(ISysMessageService.class);
        final ISysUserService sysUserService = SpringUtils.getBean(ISysUserService.class);
        final String remoteHostPort = NettyUtils.getRemoteHostPort(channel);
        //查询数据库等耗时任务 不可以占用netty线程 这里我们使用线程池
        executor.execute(() -> {
            try {
                LoginUser loginUser = sysUserService.createloginUser(sysUserId, remoteHostPort);
                if (loginUser == null) {
                    log.warn("SysUserId:{{}}查询不到用户数据.关闭连接", sysUserId);
                    channel.close();
                } else {
                    log.info("SysUserId:{{}}绑定到Channel:{{}}", loginUser, remoteHostPort);
                    channel.attr(userVoKey).set(loginUser);
                    channelMap.setUserId(sysUserId, channel);

                    //推送未读消息
                    sysMessageService.pushUnReadMessageCount(sysUserId);

                    //推送服务器运行信息
                    if (loginUser.getPermissions().contains("/AppController/systemInfo")) {
                        startSendSystemInfo(channel);
                    }
                }
            } catch (Exception e) {
                log.error("SysUserId:{{}}查询用户数据错误.关闭socket连接", sysUserId, e);
                channel.close();
            }
        });
    }

    /**
     * 推送服务器运行信息任务
     * @param channel socket通道
     */
    public static void startSendSystemInfo(final Channel channel) {
        scheduleWithFixedDelay(() -> {
            Executor executor = SpringUtils.getBean("nettyExecutor", Executor.class);
            executor.execute(() -> {
                ExtPushMessage message = new ExtPushMessage(ExtPushMessage.MessageType.SYS_INFO);
                message.put("data", SystemInfo.getSystemInfo());
                String messageStr = JsonUtils.toJson(message);
                channel.writeAndFlush(new TextWebSocketFrame(messageStr));
            });
        }, channel, "推送服务器运行信息任务", 10, 5, TimeUnit.SECONDS);
    }

    /**
     * ws心跳定时任务
     * @param channel socket通道
     */
    public static void startHeartbeat(final Channel channel) {
        scheduleWithFixedDelay(() -> {
            if(channelMap.hasUser(channel)) {
                Long userId = channelMap.getUserId(channel);
                log.debug("ws心跳定时任务.推送用户:->{{}}", userId);
                channel.writeAndFlush(new PingWebSocketFrame());
            }
        }, channel, "ws心跳定时任务", 10, 10, TimeUnit.SECONDS);
    }

    /**
     * 添加周期性任务,需要在channel关闭时取消任务.
     * @param runnable 任务逻辑单元
     * @param channel socket通道
     * @param taskName 任务名称(日志使用)
     * @param initialDelay 初始延迟时间
     * @param delay 执行周期
     * @param unit 时间单元
     */
    public static void scheduleWithFixedDelay(Runnable runnable, Channel channel, String taskName,
                                              long initialDelay, long delay, TimeUnit unit) {
        ScheduledFuture future = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
        channel.closeFuture().addListener((ChannelFutureListener) f -> {
            log.debug("Channel:{{}}连接关闭.取消定时任务:{{}}", NettyUtils.getRemoteHostPort(channel), taskName);
            future.cancel(true);
        });
    }

    /**
     * 给用户推送消息
     * @param userId 用户ID
     * @param message 消息实体类
     * @return true:成功 false:失败
     */
    public static boolean sendMessageToUser(Long userId, ExtPushMessage message) {
        if (!channelMap.online(userId)) {
            log.warn("当前用户sysUserId:{{}}不在线.发送消息失败");
            return false;
        } else {
            String messageStr = JsonUtils.toJson(message);
            channelMap.getChannelByUserId(userId).writeAndFlush(new TextWebSocketFrame(messageStr));
            log.debug("推送消息成功.用户ID:{{}} - 消息内容:{{}}", userId, messageStr);
            return true;
        }
    }

    /**
     * 获取全部在线用户
     */
    public static List<LoginUser> getOnlileUserList() {
        return channelMap.getOnlineChannels()
            .stream()
            .filter(channel -> channel.attr(userVoKey).get() != null)
            .map(channel -> channel.attr(userVoKey).get())
            .collect(Collectors.toList());
    }

    /**
     * 发送消息至全体用户
     * @param message
     */
    public static void sendMessageToAll(ExtPushMessage message) {
        final String messageStr = JsonUtils.toJson(message);
        channelMap.getOnlineChannels().forEach(channel -> {
            try {
                if(channel.isWritable()) {
                    channel.writeAndFlush(new TextWebSocketFrame(messageStr));
                } else {
                    log.warn("Channel缓冲区已满,丢弃日志推送消息.");
                }
            } catch (Exception e) {
                log.error("发送消息失败", e);
            }
        });
    }
}
